package com.flink.wc.demo.demo_20240807;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

public class Demo02 {
    public static void main(String[] args) {
        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 创建表
        tableEnv.executeSql("CREATE TABLE source(\n" +
                "id INT, \n" +
                "ts BIGINT, \n"+
                "vc INT\n"+
                ")WITH(\n" +
                "    'connector' = 'datagen', \n" +
                "    'rows-per-second'='1', \n" +
                "    'fields.id.kind'='random', \n" +
                "    'fields.id.min'='1', \n" +
                "    'fields.id.max'='10', \n" +
                "    'fields.ts.kind'='sequence', \n" +
                "    'fields.ts.start'='1', \n" +
                "    'fields.ts.end'='1000000', \n" +
                "    'fields.vc.kind'='random', \n" +
                "    'fields.vc.min'='1', \n" +
                "    'fields.vc.max'='100'\n" +
                ");\n");

        tableEnv.executeSql("CREATE TABLE sink (\n" +
                "    id INT, \n" +
                "    sumVC INT \n" +
                ") WITH (\n" +
                "'connector' = 'print'\n" +
                ");\n");

        // 执行查询
        //    1.使用sql查询
        Table table = tableEnv.sqlQuery("select id, sum(vc) as sumVC from source where id>5 group by id;");
        //   把table对象注册成表名
        tableEnv.createTemporaryView("tmp", table);
        tableEnv.sqlQuery("select * from tmp where id>7");

        //    2.使用table api查询
//        Table source = tableEnv.from("source");
//        Table result = source
//                .where($("id").isGreater(5))
//                .groupBy($("id"))
//                .aggregate($("vc").sum().as("sumVC"))
//                .select($("id"), $("sumVC"));


        // 输出表
        // sql写法
        tableEnv.executeSql("insert into sink select * from tmp");
        // table api写法
//        result.executeInsert("sink");
    }
}